Skip to content

feat: streaming column-major single-RG merge engine (PR-6b)#6409

Draft
g-talbot wants to merge 2 commits intogtt/parquet-page-decoderfrom
gtt/streaming-merge-engine-merger
Draft

feat: streaming column-major single-RG merge engine (PR-6b)#6409
g-talbot wants to merge 2 commits intogtt/parquet-page-decoderfrom
gtt/streaming-merge-engine-merger

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented May 8, 2026

Summary

  • New merge::streaming::streaming_merge_sorted_parquet_files: async N-input → M-output merge consuming inputs as Box<dyn ColumnPageStream> and writing each output column-by-column via StreamingParquetWriter (single row group per output).
  • Inputs drained per-RG via PR-6a's StreamDecoder, then concatenated per input. Per-RG decode memory is bounded; per-input concat matches the existing engine's input shape for the merge planner.
  • Reuses the existing permutation / KV metadata / sorting columns / MC-3 sort-order helpers (super::writer) and union schema / output-optimisation helpers (super::schema) — now pub(super). PR-7 folds the non-streaming path away.
  • Multi-RG output at metric_name boundaries lands in PR-6c.

Why the win is on the output side

The standard ArrowWriter materialises a full row group worth of column-chunk buffers before serialising. PR-2's StreamingParquetWriter flushes one column chunk at a time, so output peak memory is bounded by the largest single column chunk plus bookkeeping (page index, bloom filters), not by the row group total.

The input side stays at "drain whole input into a RecordBatch per input" because per-RG streaming through the merge driver requires synchronising RG boundaries across inputs, which only works once prefix=1 multi-RG inputs are the dominant compaction case. PR-6b is correct without it; the input-side streaming optimisation is a follow-up.

Stack

main
└── gtt/parquet-streaming-base (= main ∪ PR-2 #6384 ∪ PR-4 #6386)
    └── gtt/column-page-stream-trait (PR-5a #6406)
        ├── gtt/legacy-input-adapter (PR-5  #6408)
        └── gtt/parquet-page-decoder  (PR-6a #6407)
            └── gtt/streaming-merge-engine-merger ← PR-6b (this PR)
                └── gtt/streaming-merge-engine-multi-rg (PR-6c — next)

Test plan

  • test_two_inputs_simple_merge — total row count + ascending sorted_series
  • test_output_is_single_row_group — PR-6b's contract: every output is exactly 1 RG
  • test_total_rows_preserved — MC-1 invariant
  • test_sort_schema_mismatch_rejected — input metadata validation
  • test_window_start_mismatch_rejected — same
  • test_output_has_page_index_metadata — column index + offset index present (query pruning)
  • test_kv_metadata_propagated_to_outputqh.sort_fields, qh.window_start, qh.window_duration carry through; qh.num_merge_ops incremented
  • test_all_empty_inputs_no_output
  • test_one_empty_input_among_nonempty
  • test_output_drainable_by_stream_decoder — round-trip output → StreamDecoder → row count

CI gates locally green: clippy --workspace --all-features --tests with -Dwarnings, nightly fmt --check, cargo doc --no-deps, cargo machete, license headers, log format, typos. 411/411 crate tests pass.

🤖 Generated with Claude Code

g-talbot and others added 2 commits May 8, 2026 09:32
Adds `merge::streaming::streaming_merge_sorted_parquet_files`, an
async N-input → M-output merge that consumes inputs as
`Box<dyn ColumnPageStream>` (PR-5a's trait) and writes each output
column-by-column via `StreamingParquetWriter` (PR-2). Each output is
single row group; multi-RG output at metric_name boundaries lands in
PR-6c.

Compared to the existing whole-file engine, the win is on the output
side: the standard `ArrowWriter` materialises a full row-group worth
of column-chunk buffers before serialising, whereas this writer
flushes one column chunk at a time, so output peak memory is bounded
by the largest single column chunk plus bookkeeping (page index,
bloom filters), not by the total row group.

Inputs are drained one row group at a time via PR-6a's
`StreamDecoder`, then concatenated per input. Per-RG decode memory
is bounded; the per-input concat matches the existing engine's input
shape for the merge planner. Truly per-RG streaming inputs (one input
RG at a time across all inputs) lands when prefix=1 multi-RG inputs
become the dominant compaction path — PR-6b is correct without it.

Reuses the existing permutation, KV metadata, sorting columns, MC-3
sort-order check (`super::writer`), and union schema / output
optimisation (`super::schema`) helpers — they're now `pub(super)` so
both the existing whole-file engine and the new streaming engine can
share them. PR-7 will fold the non-streaming path away.

Tests (10, all passing): two-input simple merge, single-RG output
contract, total row count preservation (MC-1), sort-schema mismatch
rejected, window mismatch rejected, output has page-level statistics,
KV metadata propagated (with `qh.num_merge_ops` incremented), all-
empty inputs produce no output, one empty among non-empty handled,
output drainable through `StreamDecoder` round-trip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Newer nightly rustfmt's `wrap_comments` reflows the module-level
doc comments tighter than my local nightly at original push time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/streaming-merge-engine-merger branch from 6226032 to add52f0 Compare May 8, 2026 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant